1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.observables;
17
18 import static org.junit.Assert.assertEquals;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assert.fail;
21
22 import java.util.Iterator;
23 import java.util.NoSuchElementException;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26
27 import org.junit.Assert;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.Mock;
31 import org.mockito.MockitoAnnotations;
32
33 import rx.Observable;
34 import rx.Observable.OnSubscribe;
35 import rx.Subscriber;
36 import rx.exceptions.TestException;
37 import rx.functions.Action0;
38 import rx.functions.Action1;
39 import rx.functions.Func1;
40 import rx.schedulers.Schedulers;
41 import rx.subscriptions.Subscriptions;
42
43 public class BlockingObservableTest {
44
45 @Mock
46 Subscriber<Integer> w;
47
48 @Before
49 public void before() {
50 MockitoAnnotations.initMocks(this);
51 }
52
53 @Test
54 public void testLast() {
55 BlockingObservable<String> obs = BlockingObservable.from(Observable.just("one", "two", "three"));
56
57 assertEquals("three", obs.last());
58 }
59
60 @Test(expected = NoSuchElementException.class)
61 public void testLastEmptyObservable() {
62 BlockingObservable<Object> obs = BlockingObservable.from(Observable.empty());
63 obs.last();
64 }
65
66 @Test
67 public void testLastOrDefault() {
68 BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(1, 0, -1));
69 int last = observable.lastOrDefault(-100, new Func1<Integer, Boolean>() {
70 @Override
71 public Boolean call(Integer args) {
72 return args >= 0;
73 }
74 });
75 assertEquals(0, last);
76 }
77
78 @Test
79 public void testLastOrDefault1() {
80 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
81 assertEquals("three", observable.lastOrDefault("default"));
82 }
83
84 @Test
85 public void testLastOrDefault2() {
86 BlockingObservable<Object> observable = BlockingObservable.from(Observable.empty());
87 assertEquals("default", observable.lastOrDefault("default"));
88 }
89
90 @Test
91 public void testLastOrDefaultWithPredicate() {
92 BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(1, 0, -1));
93 int last = observable.lastOrDefault(0, new Func1<Integer, Boolean>() {
94 @Override
95 public Boolean call(Integer args) {
96 return args < 0;
97 }
98 });
99
100 assertEquals(-1, last);
101 }
102
103 @Test
104 public void testLastOrDefaultWrongPredicate() {
105 BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(-1, -2, -3));
106 int last = observable.lastOrDefault(0, new Func1<Integer, Boolean>() {
107 @Override
108 public Boolean call(Integer args) {
109 return args >= 0;
110 }
111 });
112 assertEquals(0, last);
113 }
114
115 @Test
116 public void testLastWithPredicate() {
117 BlockingObservable<String> obs = BlockingObservable.from(Observable.just("one", "two", "three"));
118 assertEquals("two", obs.last(new Func1<String, Boolean>() {
119 @Override
120 public Boolean call(String s) {
121 return s.length() == 3;
122 }
123 }));
124 }
125
126 @Test
127 public void testSingle() {
128 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one"));
129 assertEquals("one", observable.single());
130 }
131
132 @Test
133 public void testSingleDefault() {
134 BlockingObservable<Object> observable = BlockingObservable.from(Observable.empty());
135 assertEquals("default", observable.singleOrDefault("default"));
136 }
137
138 @Test(expected = IllegalArgumentException.class)
139 public void testSingleDefaultPredicateMatchesMoreThanOne() {
140 BlockingObservable.from(Observable.just("one", "two")).singleOrDefault("default", new Func1<String, Boolean>() {
141 @Override
142 public Boolean call(String args) {
143 return args.length() == 3;
144 }
145 });
146 }
147
148 @Test
149 public void testSingleDefaultPredicateMatchesNothing() {
150 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two"));
151 String result = observable.singleOrDefault("default", new Func1<String, Boolean>() {
152 @Override
153 public Boolean call(String args) {
154 return args.length() == 4;
155 }
156 });
157 assertEquals("default", result);
158 }
159
160 @Test(expected = IllegalArgumentException.class)
161 public void testSingleDefaultWithMoreThanOne() {
162 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
163 observable.singleOrDefault("default");
164 }
165
166 @Test
167 public void testSingleWithPredicateDefault() {
168 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "four"));
169 assertEquals("four", observable.single(new Func1<String, Boolean>() {
170 @Override
171 public Boolean call(String s) {
172 return s.length() == 4;
173 }
174 }));
175 }
176
177 @Test(expected = IllegalArgumentException.class)
178 public void testSingleWrong() {
179 BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(1, 2));
180 observable.single();
181 }
182
183 @Test(expected = NoSuchElementException.class)
184 public void testSingleWrongPredicate() {
185 BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(-1));
186 observable.single(new Func1<Integer, Boolean>() {
187 @Override
188 public Boolean call(Integer args) {
189 return args > 0;
190 }
191 });
192 }
193
194 @Test
195 public void testToIterable() {
196 BlockingObservable<String> obs = BlockingObservable.from(Observable.just("one", "two", "three"));
197
198 Iterator<String> it = obs.toIterable().iterator();
199
200 assertEquals(true, it.hasNext());
201 assertEquals("one", it.next());
202
203 assertEquals(true, it.hasNext());
204 assertEquals("two", it.next());
205
206 assertEquals(true, it.hasNext());
207 assertEquals("three", it.next());
208
209 assertEquals(false, it.hasNext());
210
211 }
212
213 @Test(expected = NoSuchElementException.class)
214 public void testToIterableNextOnly() {
215 BlockingObservable<Integer> obs = BlockingObservable.from(Observable.just(1, 2, 3));
216
217 Iterator<Integer> it = obs.toIterable().iterator();
218
219 Assert.assertEquals((Integer) 1, it.next());
220 Assert.assertEquals((Integer) 2, it.next());
221 Assert.assertEquals((Integer) 3, it.next());
222
223 it.next();
224 }
225
226 @Test(expected = NoSuchElementException.class)
227 public void testToIterableNextOnlyTwice() {
228 BlockingObservable<Integer> obs = BlockingObservable.from(Observable.just(1, 2, 3));
229
230 Iterator<Integer> it = obs.toIterable().iterator();
231
232 Assert.assertEquals((Integer) 1, it.next());
233 Assert.assertEquals((Integer) 2, it.next());
234 Assert.assertEquals((Integer) 3, it.next());
235
236 boolean exc = false;
237 try {
238 it.next();
239 } catch (NoSuchElementException ex) {
240 exc = true;
241 }
242 Assert.assertEquals(true, exc);
243
244 it.next();
245 }
246
247 @Test
248 public void testToIterableManyTimes() {
249 BlockingObservable<Integer> obs = BlockingObservable.from(Observable.just(1, 2, 3));
250
251 Iterable<Integer> iter = obs.toIterable();
252
253 for (int j = 0; j < 3; j++) {
254 Iterator<Integer> it = iter.iterator();
255
256 Assert.assertTrue(it.hasNext());
257 Assert.assertEquals((Integer) 1, it.next());
258 Assert.assertTrue(it.hasNext());
259 Assert.assertEquals((Integer) 2, it.next());
260 Assert.assertTrue(it.hasNext());
261 Assert.assertEquals((Integer) 3, it.next());
262 Assert.assertFalse(it.hasNext());
263 }
264 }
265
266 @Test(expected = TestException.class)
267 public void testToIterableWithException() {
268 BlockingObservable<String> obs = BlockingObservable.from(Observable.create(new Observable.OnSubscribe<String>() {
269
270 @Override
271 public void call(Subscriber<? super String> observer) {
272 observer.onNext("one");
273 observer.onError(new TestException());
274 }
275 }));
276
277 Iterator<String> it = obs.toIterable().iterator();
278
279 assertEquals(true, it.hasNext());
280 assertEquals("one", it.next());
281
282 assertEquals(true, it.hasNext());
283 it.next();
284
285 }
286
287 @Test
288 public void testForEachWithError() {
289 try {
290 BlockingObservable.from(Observable.create(new Observable.OnSubscribe<String>() {
291
292 @Override
293 public void call(final Subscriber<? super String> observer) {
294 new Thread(new Runnable() {
295
296 @Override
297 public void run() {
298 observer.onNext("one");
299 observer.onNext("two");
300 observer.onNext("three");
301 observer.onCompleted();
302 }
303 }).start();
304 }
305 })).forEach(new Action1<String>() {
306
307 @Override
308 public void call(String t1) {
309 throw new RuntimeException("fail");
310 }
311 });
312 fail("we expect an exception to be thrown");
313 } catch (Throwable e) {
314
315 }
316 }
317
318 @Test
319 public void testFirst() {
320 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
321 assertEquals("one", observable.first());
322 }
323
324 @Test(expected = NoSuchElementException.class)
325 public void testFirstWithEmpty() {
326 BlockingObservable.from(Observable.<String> empty()).first();
327 }
328
329 @Test
330 public void testFirstWithPredicate() {
331 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
332 String first = observable.first(new Func1<String, Boolean>() {
333 @Override
334 public Boolean call(String args) {
335 return args.length() > 3;
336 }
337 });
338 assertEquals("three", first);
339 }
340
341 @Test(expected = NoSuchElementException.class)
342 public void testFirstWithPredicateAndEmpty() {
343 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
344 observable.first(new Func1<String, Boolean>() {
345 @Override
346 public Boolean call(String args) {
347 return args.length() > 5;
348 }
349 });
350 }
351
352 @Test
353 public void testFirstOrDefault() {
354 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
355 assertEquals("one", observable.firstOrDefault("default"));
356 }
357
358 @Test
359 public void testFirstOrDefaultWithEmpty() {
360 BlockingObservable<String> observable = BlockingObservable.from(Observable.<String> empty());
361 assertEquals("default", observable.firstOrDefault("default"));
362 }
363
364 @Test
365 public void testFirstOrDefaultWithPredicate() {
366 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
367 String first = observable.firstOrDefault("default", new Func1<String, Boolean>() {
368 @Override
369 public Boolean call(String args) {
370 return args.length() > 3;
371 }
372 });
373 assertEquals("three", first);
374 }
375
376 @Test
377 public void testFirstOrDefaultWithPredicateAndEmpty() {
378 BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
379 String first = observable.firstOrDefault("default", new Func1<String, Boolean>() {
380 @Override
381 public Boolean call(String args) {
382 return args.length() > 5;
383 }
384 });
385 assertEquals("default", first);
386 }
387
388 @Test
389 public void testSingleOrDefaultUnsubscribe() throws InterruptedException {
390 final CountDownLatch unsubscribe = new CountDownLatch(1);
391 Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {
392 @Override
393 public void call(Subscriber<? super Integer> subscriber) {
394 subscriber.add(Subscriptions.create(new Action0() {
395 @Override
396 public void call() {
397 unsubscribe.countDown();
398 }
399 }));
400 subscriber.onNext(1);
401 subscriber.onNext(2);
402
403 }
404 }).subscribeOn(Schedulers.newThread());
405 try {
406 o.toBlocking().singleOrDefault(-1);
407 fail("Expected IllegalArgumentException because there are 2 elements");
408 } catch (IllegalArgumentException e) {
409
410 }
411 assertTrue("Timeout means `unsubscribe` is not called", unsubscribe.await(30, TimeUnit.SECONDS));
412 }
413
414 @Test
415 public void testUnsubscribeFromSingleWhenInterrupted() throws InterruptedException {
416 new InterruptionTests().assertUnsubscribeIsInvoked("single()", new Action1<BlockingObservable<Void>>() {
417 @Override
418 public void call(final BlockingObservable<Void> o) {
419 o.single();
420 }
421 });
422 }
423
424 @Test
425 public void testUnsubscribeFromForEachWhenInterrupted() throws InterruptedException {
426 new InterruptionTests().assertUnsubscribeIsInvoked("forEach()", new Action1<BlockingObservable<Void>>() {
427 @Override
428 public void call(final BlockingObservable<Void> o) {
429 o.forEach(new Action1<Void>() {
430 @Override
431 public void call(final Void aVoid) {
432
433 }
434 });
435 }
436 });
437 }
438
439 @Test
440 public void testUnsubscribeFromFirstWhenInterrupted() throws InterruptedException {
441 new InterruptionTests().assertUnsubscribeIsInvoked("first()", new Action1<BlockingObservable<Void>>() {
442 @Override
443 public void call(final BlockingObservable<Void> o) {
444 o.first();
445 }
446 });
447 }
448
449 @Test
450 public void testUnsubscribeFromLastWhenInterrupted() throws InterruptedException {
451 new InterruptionTests().assertUnsubscribeIsInvoked("last()", new Action1<BlockingObservable<Void>>() {
452 @Override
453 public void call(final BlockingObservable<Void> o) {
454 o.last();
455 }
456 });
457 }
458
459 @Test
460 public void testUnsubscribeFromLatestWhenInterrupted() throws InterruptedException {
461 new InterruptionTests().assertUnsubscribeIsInvoked("latest()", new Action1<BlockingObservable<Void>>() {
462 @Override
463 public void call(final BlockingObservable<Void> o) {
464 o.latest().iterator().next();
465 }
466 });
467 }
468
469 @Test
470 public void testUnsubscribeFromNextWhenInterrupted() throws InterruptedException {
471 new InterruptionTests().assertUnsubscribeIsInvoked("next()", new Action1<BlockingObservable<Void>>() {
472 @Override
473 public void call(final BlockingObservable<Void> o) {
474 o.next().iterator().next();
475 }
476 });
477 }
478
479 @Test
480 public void testUnsubscribeFromGetIteratorWhenInterrupted() throws InterruptedException {
481 new InterruptionTests().assertUnsubscribeIsInvoked("getIterator()", new Action1<BlockingObservable<Void>>() {
482 @Override
483 public void call(final BlockingObservable<Void> o) {
484 o.getIterator().next();
485 }
486 });
487 }
488
489 @Test
490 public void testUnsubscribeFromToIterableWhenInterrupted() throws InterruptedException {
491 new InterruptionTests().assertUnsubscribeIsInvoked("toIterable()", new Action1<BlockingObservable<Void>>() {
492 @Override
493 public void call(final BlockingObservable<Void> o) {
494 o.toIterable().iterator().next();
495 }
496 });
497 }
498
499
500 private static class InterruptionTests {
501
502 private boolean isUnSubscribed;
503 private RuntimeException error;
504 private CountDownLatch latch = new CountDownLatch(1);
505
506 private Observable<Void> createObservable() {
507 return Observable.<Void>never().doOnUnsubscribe(new Action0() {
508 @Override
509 public void call() {
510 isUnSubscribed = true;
511 }
512 });
513 }
514
515 private void startBlockingAndInterrupt(final Action1<BlockingObservable<Void>> blockingAction) {
516 Thread subscriptionThread = new Thread() {
517 @Override
518 public void run() {
519 try {
520 blockingAction.call(createObservable().toBlocking());
521 } catch (RuntimeException e) {
522 if (!(e.getCause() instanceof InterruptedException)) {
523 error = e;
524 }
525 }
526 latch.countDown();
527 }
528 };
529 subscriptionThread.start();
530 subscriptionThread.interrupt();
531 }
532
533 void assertUnsubscribeIsInvoked(final String method, final Action1<BlockingObservable<Void>> blockingAction)
534 throws InterruptedException {
535 startBlockingAndInterrupt(blockingAction);
536 assertTrue("Timeout means interruption is not performed", latch.await(30, TimeUnit.SECONDS));
537 if (error != null) {
538 throw error;
539 }
540 assertTrue("'unsubscribe' is not invoked when thread is interrupted for " + method, isUnSubscribed);
541 }
542
543 }
544
545 }